package com.yzq.os.spider.v.dao;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.sql.DataSource;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Repository;

import com.yzq.os.spider.v.dao.mapper.SpiderRecordMapper;
import com.yzq.os.spider.v.domain.Record;
import com.yzq.os.spider.v.service.domain.SpiderRecordService;

@Repository
public class SpiderRecordDao extends AbstractDao {

	private static final String DATE_SUFFIX_PATTERN = "yyyy_MM_dd";

	public void batchSave(Date crawlDate, String tableName, final List<Record> jobInfos) {
		if (CollectionUtils.isNotEmpty(jobInfos)) {

			StringBuffer sql = new StringBuffer();
			sql.append(" insert ignore into `" + tableName + "` ");
			sql.append("   (search_engine_id, ");
			sql.append("    website_id, ");
			sql.append("    query_url_id, ");
			sql.append("    job_title, ");
			sql.append("    company_name, ");
			sql.append("    cmp_company_id, ");
			sql.append("    city_text, ");
			sql.append("    job_date, ");
			sql.append("    job_type_code, ");
			sql.append("    job_industry_code, ");
			sql.append("    job_city_code, ");
			sql.append("    job_link_url, ");
			sql.append("    do_flag, ");
			sql.append("    spider_date, ");
			sql.append("    cmptr_job_id, ");
			sql.append("    unique_md5, ");
			sql.append("    job_salary, ");
			sql.append("    job_type, ");
			sql.append("    company_link_url) ");
			sql.append(" values ");
			sql.append("   (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ");

			for (Record job : jobInfos) {
				// make params
				List<Object> params = new ArrayList<Object>();
				params.add(job.getSearchEngineId());
				params.add(job.getWebsiteId());
				params.add(job.getQueryUrlId());
				params.add(job.getJobTitle());
				params.add(job.getCompanyName());
				params.add(job.getCmpCompanyId());
				params.add(job.getCityText());
				params.add(job.getJobDate());
				params.add(job.getJobTypeCode());
				params.add(job.getIndustryCode());
				params.add(job.getCityCode());
				params.add(job.getJobLinkURL());
				params.add(job.getDoFlag());
				params.add(crawlDate);
				params.add(job.getCmptrJobId());
				params.add(job.getUniqueMd5());
				params.add(job.getJobSalary());
				params.add(job.getJobType());
				params.add(job.getCompanyLinkURL());
				// execute
				try {
					jdbcTemplateOnline.update(sql.toString(), params.toArray());
				} catch (DuplicateKeyException e) {
					// not thing to do
				}
			}
		}
	}


	public void updateSplitDoFlag(String tableName, final List<Record> crawJobs, final int targetDoFlag) {
		String sql = "update `" + tableName + "` t set t.do_flag = ? where t.job_id = ?";

		jdbcTemplateOnline.batchUpdate(sql, new BatchPreparedStatementSetter() {

			@Override
			public void setValues(PreparedStatement ps, int index) throws SQLException {
				Record crawJob = crawJobs.get(index);
				ps.setInt(1, targetDoFlag);
				ps.setInt(2, crawJob.getId());
			}

			@Override
			public int getBatchSize() {
				return crawJobs.size();
			}
		});
	}

	public String[] procStatistics(final int engineId, final String tableName) {
		if (isExistTableOnlineDatabase(tableName)) {
			StringBuffer sql = new StringBuffer();
			sql.append(" select search_engine_id, do_flag, count(*) count_job ");
			sql.append("   from `" + tableName + "` ");
			sql.append("  group by search_engine_id, do_flag ");
			sql.append("  order by search_engine_id, do_flag ");

			logger.debug("SQL:[" + sql.toString() + "]");

			List<String[]> list = jdbcTemplateOnline.query(sql.toString(), new RowMapper<String[]>() {

				@Override
				public String[] mapRow(ResultSet rs, int index) throws SQLException {

					int doFlag = rs.getInt("do_flag");
					int countJob = rs.getInt("count_job");

					String[] strs = new String[4];
					strs[0] = String.valueOf(engineId);
					strs[1] = String.valueOf(doFlag);
					strs[2] = String.valueOf(countJob);
					strs[3] = tableName;

					return strs;
				}
			});

			if (CollectionUtils.isNotEmpty(list)) {
				return list.get(0);
			}
		}
		return null;

	}

	public List<String> findExistenceWebsiteTableNames(int searchEngineId) {
		StringBuffer sql = new StringBuffer();
		sql.append(" select table_name ");
		sql.append("   from information_schema.tables ");
		sql.append("  where table_name like '");
		sql.append(SpiderRecordService.BASE_TABLE_NAME_PREFIX);
		sql.append("\\_");
		sql.append(searchEngineId);
		sql.append("\\_");
		sql.append("%' ");
		sql.append("  order by table_name desc ");

		return jdbcTemplateOnline.queryForList(sql.toString(), String.class);
	}

	public List<String> findExistJobSaveTableNames(String baseTableNamePrefix) {
		StringBuffer sql = new StringBuffer();
		sql.append(" select table_name ");
		sql.append("   from information_schema.tables ");
		sql.append("  where table_name like '");
		sql.append(baseTableNamePrefix);
		sql.append("%' ");
		sql.append("  order by table_name desc ");

		return jdbcTemplateOnline.queryForList(sql.toString(), String.class);
	}

	public void createTable(String tableName) {
		StringBuffer sql = new StringBuffer();
		sql.append(" create table `" + tableName + "` ( ");
		sql.append("   `job_id` int(11) not null auto_increment, ");
		sql.append("   `search_engine_id` int(11) not null, ");
		sql.append("   `website_id` int(11) not null, ");
		sql.append("   `query_url_id` int(11) not null, ");
		sql.append("   `job_title` varchar(512) not null, ");
		sql.append("   `company_name` varchar(512) not null, ");
		sql.append("   `cmp_company_id` varchar(45) null, ");
		sql.append("   `city_text` varchar(2048) not null, ");
		sql.append("   `job_date` date not null, ");
		sql.append("   `job_type_code` varchar(100) not null, ");
		sql.append("   `job_industry_code` varchar(200) not null, ");
		sql.append("   `job_city_code` varchar(200) not null, ");
		sql.append("   `job_salary` varchar(200) null, ");
		sql.append("   `job_type` varchar(200) null, ");
		sql.append("   `job_link_url` varchar(1024) not null, ");
		sql.append("   `company_link_url` varchar(1024) not null, ");
		sql.append("   `unique_md5` varchar(45) not null, ");
		sql.append("   `do_flag` int(1) not null, ");
		sql.append("   `spider_date` date not null, ");
		sql.append("   `cmptr_job_id` varchar(128) null, ");
		sql.append("   primary key  (`job_id`), ");
		sql.append("   unique key `unique_md5` (`unique_md5`), ");
		sql.append("   key `engine_doflag` (`search_engine_id`,`do_flag`), ");
		sql.append("   key `idx_company_name` (`company_name`) ");
		sql.append(" ) engine=MyISAM default charset=utf8 ");

		logger.info("Create tableName:[" + tableName + "],SQL:[" + sql.toString() + "]");

		jdbcTemplateOnline.execute(sql.toString());
	}

	public List<Record> findRecords(String tableName, int maxReturn) {
		StringBuffer sql = new StringBuffer();
		sql.append(" select t.job_id, ");
		sql.append("        t.search_engine_id, ");
		sql.append("        t.website_id, ");
		sql.append("        t.query_url_id, ");
		sql.append("        t.job_title, ");
		sql.append("        t.company_name, ");
		sql.append("        t.cmp_company_id, ");
		sql.append("        t.city_text, ");
		sql.append("        t.job_date, ");
		sql.append("        t.job_type_code, ");
		sql.append("        t.job_industry_code, ");
		sql.append("        t.job_link_url, ");
		sql.append("        t.company_link_url, ");
		sql.append("        t.job_city_code, ");
		sql.append("        t.do_flag, ");
		sql.append("        t.cmptr_job_id, ");
		sql.append("        t.unique_md5, ");
		sql.append("        t.job_salary, ");
		sql.append("        t.job_type ");
		sql.append("   from `" + tableName + "` t  limit 0, ? ");

		return jdbcTemplateOnline.query(sql.toString(), new Object[] { maxReturn }, new SpiderRecordMapper());
	}


	public List<String> findOnlineJobTableNames(Date date) {
		return findJobTableNames(date, jdbcTemplateOnline);
	}


	public List<String> findOfflineJobTableNames(Date date) {
		return findJobTableNames(date, jdbcTemplateOffline);
	}


	private List<String> findJobTableNames(Date date, JdbcTemplate jt) {
		StringBuffer sql = new StringBuffer();
		sql.append(" select table_name ");
		sql.append("   from information_schema.tables ");
		sql.append("  where table_name like 'crawl_jobs_%_" + DateFormatUtils.format(date, DATE_SUFFIX_PATTERN) + "' ");

		return jt.queryForList(sql.toString(), String.class);
	}


	public boolean checkJdbcConnectionIsSame() throws SQLException {
		boolean returnValue = false;
		DataSource onsource = jdbcTemplateOnline.getDataSource();
		DataSource offsource = jdbcTemplateOffline.getDataSource();
		Connection conOnline = DataSourceUtils.getConnection(onsource);
		Connection conOffline = DataSourceUtils.getConnection(offsource);
		String onUrl = conOnline.getMetaData().getURL();
		String offUrl = conOffline.getMetaData().getURL();
		if (StringUtils.equalsIgnoreCase(onUrl, offUrl)) {
			returnValue = true;
		}
		DataSourceUtils.releaseConnection(conOnline, onsource);
		DataSourceUtils.releaseConnection(conOffline, offsource);
		return returnValue;
	}

	

	public int countBySql(String sql) {
		return jdbcTemplateOnline.queryForInt(sql);
	}

	public List<String> findAllCompanyNames(String jobSaveTableName) {
		String sql = "select distinct company_name from " + jobSaveTableName;
		logger.info("sql:[" + sql + "]");
		return jdbcTemplateOnline.queryForList(sql, String.class);
	}

}
